/**
 * 
 */
package com.ws.framework.remoteservice.core.consumer;

import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.zookeeper.CreateMode;

import com.google.common.collect.Lists;
import com.ws.framework.remoteservice.core.ex.WRSIException;
import com.ws.framework.remoteservice.core.model.ConsumerInfo;
import com.ws.framework.remoteservice.core.model.ProviderInfo;
import com.ws.framework.remoteservice.core.model.ServiceKey;
import com.ws.framework.remoteservice.core.protocal.Docker;
import com.ws.framework.remoteservice.core.protocal.DockerManager;
import com.ws.framework.remoteservice.core.provider.ServicePublisher;
import com.ws.framework.remoteservice.core.register.CuratorClient;
import com.ws.framework.remoteservice.core.util.InvocationIdUtils;
import com.ws.framework.remoteservice.core.util.NetUtils;
import com.ws.framework.remoteservice.core.util.WsProtocolParser;

/**
 * @author WSH
 *
 */
public enum PreconditionExecutor {

	instance;

	private static final Logger logger = Logger.getLogger(ServicePublisher.class.getName());

	private CuratorFramework curatorClient = CuratorClient.get();
	private static ConcurrentMap<ServiceKey, List<ConsumerInfo>> localConsumerInfos = Maps.newConcurrentMap();

	public void registerConsumer(ServiceKey serviceKey) {
		export(serviceKey);
		addWatcher();
	}

	public void createDockers(ServiceKey serviceKey) {
		for (ProviderInfo info : DockerManager.instance.getProviderInfos(serviceKey)) {
			DockerManager.instance.addDocker(serviceKey, new Docker(info));
		}
	}

	private void export(final ServiceKey serviceKey) {
		try {
			logger.info("Begin register service..");
			final ConsumerInfo info = new ConsumerInfo(InvocationIdUtils.processId, NetUtils.getLocalHost(), "appCode");
			curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
					.inBackground(new BackgroundCallback() {
						@Override
						public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
							if (event.getType().equals(CuratorEventType.CREATE)) {
								synchronized (localConsumerInfos) {
									List<ConsumerInfo> infos = localConsumerInfos.get(serviceKey);
									if (null == infos) {
										infos = Lists.newArrayList(info);
										localConsumerInfos.putIfAbsent(serviceKey, infos);
									} 
									infos.add(info);
									localConsumerInfos.put(serviceKey, infos);
								}
							}
						}
					}).forPath(WsProtocolParser.consumerEncode(serviceKey.getContractName(), serviceKey.getImplCode(), info));
			logger.info("End register service..");
		} catch (Exception e) {
			logger.warning("Register service occur error. " + e.getMessage());
			throw new WRSIException("Export error. register the service occur exception. " + e.getMessage(), e);
		}
	}

	private void addWatcher() {
		curatorClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
			@Override
			public void stateChanged(CuratorFramework client, ConnectionState newState) {
				if (newState == ConnectionState.RECONNECTED) {
					logger.log(Level.INFO, "zookeeper重新链接成功, 重新export");
					for (Iterator<Entry<ServiceKey, List<ConsumerInfo>>> iterator = localConsumerInfos.entrySet()
							.iterator(); iterator.hasNext();) {
						Entry<ServiceKey, List<ConsumerInfo>> entry = iterator.next();
						for (ConsumerInfo info : entry.getValue()) {
							exportWithRetry(entry.getKey(), info, 0);
						}
					}
				}
			}

		});
	}

	private final int MAX_RETRY_TIMES = 30;

	private void exportWithRetry(ServiceKey seviceKey, ConsumerInfo info, int index) {
		try {
			curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground()
					.forPath(WsProtocolParser.consumerEncode(seviceKey.getContractName(),
							seviceKey.getImplCode(), info));
		} catch (Exception e) {
			logger.warning("Retry export service occur error,  index :" + index + " e: " + e.getMessage());
			if (index++ < MAX_RETRY_TIMES) {
				try {
					TimeUnit.MILLISECONDS.sleep(5000);
				} catch (InterruptedException e1) {
					// ignore
				}
				exportWithRetry(seviceKey, info, index);
			} else {
				// TODO need to send msg to developer’s mobilephone, maybe zk‘s
				// server crashed
			}
		}
	}
	
	public void destroy() {
		localConsumerInfos.clear();
	}
}
